import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

public class Connect{

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> streamSource01 = env.fromElements("aa", "bb", "aa", "dd", "dd");
        DataStreamSource<Integer> streamSource02 = env.fromElements(11,22,33,22,11);
        ConnectedStreams<String, Integer> connectedStreams = streamSource01.connect(streamSource02);

        SingleOutputStreamOperator<Tuple2<String, Integer>> outputStreamOperator = connectedStreams.map(new CoMapFunction<String, Integer, Tuple2<String, Integer>>() {

            //处理第一个流数据
            @Override
            public Tuple2<String, Integer> map1(String str) throws Exception {
                return Tuple2.of(str,1);
            }
            //处理第二个流数据
            @Override
            public Tuple2<String, Integer> map2(Integer num) throws Exception {
                return Tuple2.of(String.valueOf(num),1);
            }
        });

        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = outputStreamOperator.keyBy(0).sum(1);

        sum.print();

        env.execute("ConnectDemo");
    }
}

//代码来自:
//https://msd.misuland.com/pd/4146263983340391503